Semaphore
原文:https://zhuanlan.zhihu.com/p/27314456
Semaphore(信号量)是java.util.concurrent下的一个工具类.用来控制可同时访问特定资源的线程数.内部是通过维护父类(AQS)的 int state值实现.
Semaphore中有一个”许可”的概念:
访问特定资源前,先使用acquire(1)获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
访问资源后,使用release()释放许可。
这个许可在构造时传入,赋给state值,它等同于state.
Semaphore应用场景
系统中某类资源比较紧张,只能被有限的线程访问,此时适合使用信号量。
Semaphore用来控制访问某资源的线程数,比如数据库连接.假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有20个,这时就必须控制最多只有20个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。
使用案例:
spark LiveListenerBus类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32private val eventLock = new Semaphore(0)
private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
while (true) {
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
// 消费消息
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event)
} finally {
self.synchronized {
processingEvent = false
}
}
}
}
}
}
Semaphore属于一种较常见的限流手段,Google Guava封装了一层。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18//JDK API:流速控制在每秒执行100个任务
final Semaphore semaphore = new Semaphore(100);
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
semaphore.acquire(); // 也许需要等待
executor.execute(task);
semaphore.release();
}
}
//Google Guava API:流速控制在每秒执行100个任务
final RateLimiter rateLimiter = RateLimiter.create(100);
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // 也许需要等待
executor.execute(task);
}
}